Skip to content

fix(resource_manager): reinitialize consumer threads after os.fork()#1658

Merged
wochinge merged 11 commits into
langfuse:mainfrom
pyg410:main
Jul 2, 2026
Merged

fix(resource_manager): reinitialize consumer threads after os.fork()#1658
wochinge merged 11 commits into
langfuse:mainfrom
pyg410:main

Conversation

@pyg410

@pyg410 pyg410 commented May 19, 2026

Copy link
Copy Markdown
Contributor

What does this PR do?

When using Gunicorn with --preload, os.fork() copies memory but not threads
(POSIX.1: https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html).
LangfuseResourceManager starts media upload and score ingestion consumer threads
on initialization, but these threads are not inherited by forked worker processes.
As a result, all media upload and score ingestion events are silently lost, and
calling flush() blocks forever on queue.join() → Gunicorn worker timeout (SIGABRT).

Note: span export is already fork-safe via LangfuseSpanProcessor (BatchSpanProcessor
inheritance). This fix covers the remaining background threads managed by
LangfuseResourceManager.

Related PR

Changes:

  • Extract consumer thread initialization into _init_consumer_threads() for reuse
  • Store score_ingestion_client as an instance variable for access in _at_fork_reinit()
  • Add _at_fork_reinit() to reinitialize queues and consumer threads after fork
  • Register os.register_at_fork(after_in_child=...) using weakref.WeakMethod to
    avoid permanent strong references that would prevent garbage collection
  • Add _shutdown flag to skip reinitialization on already-stopped instances

Type of change

  • Bug fix

Verification

List the main commands you ran:

 uv run pytest tests/unit/test_resource_manager.py -v

Checklist

  • I self-reviewed the diff using code_review.md.
  • I added or updated tests for behavior changes.
  • I updated docs, examples, or .env.template if needed.
  • I did not hand-edit generated files; if generated files changed, I used the upstream regeneration path.
  • I did not commit secrets or credentials.

Greptile Summary

This PR fixes silent data loss and flush() deadlocks in Gunicorn --preload deployments by registering an os.register_at_fork(after_in_child=...) handler that reinitializes the media upload and score ingestion consumer threads in each forked worker.

  • Consumer thread startup is extracted into _init_consumer_threads() so it can be called both during initial setup and after fork; queues are intentionally recreated in the child to prevent duplicating pre-fork events across workers.
  • A WeakMethod-based fork handler prevents the os.register_at_fork registry from holding a strong reference that would block garbage collection, and a _shutdown flag guards against reinitializing already-torn-down instances.
  • The fix deliberately omits recreating httpx.Client / LangfuseClient instances after fork (acknowledged in comments), leaving inherited connection-pool FDs that could cause initial connection errors in workers; the after_in_child callback also has no exception handling, so a thread-creation failure propagates out of os.fork() and crashes the worker rather than degrading gracefully.

Confidence Score: 3/5

The core fix is sound and addresses a real production issue, but the after_in_child callback has no exception handling, meaning a thread-creation failure would crash the Gunicorn worker rather than falling back gracefully.

The fork-reinitialization logic is correct for the happy path: queues are replaced, old dead thread references are discarded, new consumer threads are started, and the WeakMethod approach is appropriate. The main gap is that _init_consumer_threads() is called directly inside the after_in_child callback without a try/except — if the OS refuses to create a thread at worker startup time, the exception surfaces from os.fork() and kills the worker instead of allowing it to continue with degraded telemetry. The inherited httpx connection pool FDs are an acknowledged limitation that could surface as transient connection errors in workers.

langfuse/_client/resource_manager.py warrants a second look around the _at_fork_reinit method — specifically exception handling and whether the HTTP clients need to be recreated for full fork safety.

Sequence Diagram

sequenceDiagram
    participant Master as Master Process (Gunicorn)
    participant Fork as os.fork()
    participant Parent as Parent Process
    participant Child as Child Worker Process

    Master->>Master: Langfuse() → LangfuseResourceManager.__new__()
    Master->>Master: _initialize_instance()
    Master->>Master: _init_consumer_threads() → start media + score threads
    Master->>Master: "os.register_at_fork(after_in_child=weak_reinit_lambda)"
    Master->>Fork: os.fork()
    Fork-->>Parent: returns child PID (old threads alive)
    Fork-->>Child: returns 0 (threads NOT inherited)
    Child->>Child: after_in_child: weak_reinit() → _at_fork_reinit()
    Child->>Child: _shutdown? → False → proceed
    Child->>Child: _init_consumer_threads()
    Child->>Child: new _media_upload_queue (fresh Queue)
    Child->>Child: new _score_ingestion_queue (fresh Queue)
    Child->>Child: start fresh MediaUploadConsumer threads
    Child->>Child: start fresh ScoreIngestionConsumer thread
    Child->>Child: child ready to handle requests
    Parent->>Parent: continues with original threads unchanged
Loading
Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
langfuse/_client/resource_manager.py:344-378
**Unhandled exception in `after_in_child` callback crashes Gunicorn worker**

`_at_fork_reinit` calls `_init_consumer_threads`, which calls `Thread.start()`. If the OS refuses to create a thread — e.g., due to resource exhaustion (`OSError: can't start new thread`) — the exception propagates through the `after_in_child` callback chain and surfaces as an exception from `os.fork()` in the child process. Gunicorn would see the worker fail immediately at startup rather than the telemetry path degrading gracefully. Wrapping `_init_consumer_threads()` in a `try/except Exception` and logging the error would allow the child to continue (without consumer threads) instead of crashing.

### Issue 2 of 3
langfuse/_client/resource_manager.py:366-374
**Inherited `httpx.Client` connection pool is not fork-safe**

`self.httpx_client` and `self._score_ingestion_client` (which holds a reference to `self.httpx_client` as its `session`) are not recreated in `_at_fork_reinit`. After `os.fork()`, the child inherits duplicated file descriptors from the parent's connection pool. The parent and child share open TCP connections at the OS level; if the parent closes or recycles a connection, the child's pool may try to reuse a dead FD, causing connection errors on the first outbound requests from each worker. The existing comment acknowledges this but marks it as a future concern — since this PR is specifically targeting fork-safety, recreating the clients in the child would complete the fix.

### Issue 3 of 3
langfuse/_client/resource_manager.py:80-82
**Class-level `_lock` (RLock) not guarded around fork**

`LangfuseResourceManager._lock` is a `threading.RLock`. If `os.fork()` occurs while another thread holds `_lock` (i.e., during a concurrent `__new__` call), the child inherits the locked mutex with no thread to release it. Any subsequent `LangfuseResourceManager.__new__` call in the child — including indirect calls — would deadlock. The standard remedy is to register a `before` handler (`os.register_at_fork(before=cls._lock.acquire)`) and release it in both `after_in_parent` and `after_in_child`. In a typical Gunicorn `--preload` deployment this lock is already released before workers fork, so this is a latent risk rather than an immediate failure for the primary use case.

Reviews (1): Last reviewed commit: "fix(resource_manager): reinitialize cons..." | Re-trigger Greptile

Greptile also left 2 inline comments on this PR.

@claude claude Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

Comment thread langfuse/_client/resource_manager.py
Comment thread langfuse/_client/resource_manager.py Outdated
…r reinitialization

Signed-off-by: 박영규 <pyg410@naver.com>

@wochinge wochinge left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great contribution @pyg410 !
A few smaller comments - otherwise looking very good already 🙌🏻

Comment thread langfuse/_client/resource_manager.py
Comment thread langfuse/_client/resource_manager.py Outdated
pyg410 and others added 4 commits May 30, 2026 06:14
…ock in child process

Signed-off-by: 박영규 <pyg410@naver.com>
…ocess-unsafe socket sharing

Signed-off-by: 박영규 <pyg410@naver.com>
…d of replacing it

Signed-off-by: 박영규 <pyg410@naver.com>
@wochinge

wochinge commented Jun 9, 2026

Copy link
Copy Markdown
Collaborator

@pyg410 One more thing: I tried gunicon with preload on my Mac and I get a segfault.
Apparently we're doing too heavy stuff in the after_in_child - most importantly the http client reinitialization.

Also had a look at how others do it and seems they all lazy reinitalize the connection.
So I think we need to the same - otherwise forked-usage remains broken

…ault

Signed-off-by: 박영규 <pyg410@naver.com>
@pyg410

pyg410 commented Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

@pyg410 One more thing: I tried gunicon with preload on my Mac and I get a segfault. Apparently we're doing too heavy stuff in the after_in_child - most importantly the http client reinitialization.

Also had a look at how others do it and seems they all lazy reinitalize the connection. So I think we need to the same - otherwise forked-usage remains broken

@wochinge
Agreed on lazy reinitialization - fixed in the latest commit.(68327b7)
_at_fork_reinit() now only does lightweight work safe to call in after_in_child(replaces the class lock and swaps in fresh empty queues.)
All heavy work (httpx.Client creation, thread spawning) is deferred to _ensure_post_fork_initialized(), which is called lazily on the first add_score_task() / add_trace_task() in the child process.

@wochinge

Copy link
Copy Markdown
Collaborator

@pyg410 Thanks! Just tried it and still breaks.
The issue is actually not the heavy work but the implementation of getproxies on MacOS: https://docs.python.org/3/library/urllib.request.html

I checked a bit how others do it and the simplest options seems to be:

  1. Revert last commit (we actually don't need it, and it just makes things more complex)
  2. In _at_fork_reinit, add the following guard:
      if sys.platform == "darwin":
       # urllib proxy discovery uses macOS system APIs that are unsafe after fork.
       # Limit Python's documented workaround to the child process.
       os.environ["no_proxy"] = "*"
       os.environ["NO_PROXY"] = "*"

This deactivates the proxy discovery on Mac and makes the PR full functional.

Other libraries issues on this:

@wochinge

wochinge commented Jul 2, 2026

Copy link
Copy Markdown
Collaborator

@claude review

Comment thread langfuse/_client/resource_manager.py
@wochinge

wochinge commented Jul 2, 2026

Copy link
Copy Markdown
Collaborator

@claude review

@claude claude Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prior concern about Langfuse.api / Langfuse.async_api being cached on __init__ (bypassing the post-fork reinit) has been correctly addressed by converting them to @property delegates on the resource manager — test coverage (assert client.api is rm.api) confirms it. Only a minor ordering nit remains in _init_consumer_threads (see inline).

Extended reasoning...

Overview

This PR reinitializes the Langfuse resource manager after os.fork() so Gunicorn --preload workers don't lose their consumer threads and hang on flush(). It has evolved substantially since first submission: it now (1) recreates internally-managed httpx.Client / API clients on fork while preserving caller-supplied clients, (2) resets the class-level RLock to avoid a fork-inherited stuck-mutex deadlock, (3) works around the macOS SystemConfiguration post-fork segfault via no_proxy="*", and (4) exposes Langfuse.api / Langfuse.async_api as @property delegates so post-fork reinit is visible to callers.

Prior feedback resolved

My previous review flagged that Langfuse.__init__ cached self.api = self._resources.api as plain attributes, so under --preload all langfuse.api.* calls (e.g. get_prompt, dataset ops) would keep routing through the parent's httpx.Client even after reinit. The current diff converts both to properties (client.py:422-448), and the added test_at_fork_reinit_recreates_httpx_client_by_default asserts client.api is rm.api after the reinit — the gap is closed.

Security risks

No new attack surface: the code paths added are internal lifecycle management (thread startup, client recreation, lock reset). The no_proxy="*" write only takes effect when no proxy env vars are already configured, and it's confined to the darwin branch — so users who rely on HTTPS_PROXY in workers are not silently deproxied.

Level of scrutiny

High. Fork-safety plus lock reset plus platform-conditional env-var mutation is inherently subtle: correctness depends on ordering (lock before shutdown check), on which state is class-level vs instance-level, and on the behavior of weakref.WeakMethod under GC. The test suite is well-designed — it exercises the stuck-lock scenario explicitly and asserts the outer Langfuse client sees fresh api objects — but real Gunicorn --preload behavior on Linux and macOS is worth spot-checking before merge given that this exact codepath has already regressed once (the macOS segfault). I'd defer to a human familiar with the deployment topology.

Other factors

Only remaining finding is a nit about ordering inside _init_consumer_threads: if the media consumer.start() loop raises (e.g. RuntimeError: can't start new thread), the score queue never gets reset and the child inherits the parent's queue with non-zero unfinished_tasks, causing the exact flush() hang this PR prevents. It's guarded by the outer try/except but the safety net still leaves stale queue state — a one-line hoist of the score-queue reset above the media loop closes it. Not a blocker.

Comment thread langfuse/_client/resource_manager.py
@wochinge wochinge merged commit 12373d9 into langfuse:main Jul 2, 2026
15 of 18 checks passed
@wochinge

wochinge commented Jul 2, 2026

Copy link
Copy Markdown
Collaborator

Thanks for the fix @pyg410 ! 🚀 Will do a release with it now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants